package com.rtsapp.server.domain.mysql.hibernate.impl;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;

import com.rtsapp.server.logger.Logger;


public class AsyncTransactionWorker implements Runnable {

	private static final Logger logger =com.rtsapp.server.logger.LoggerFactory.getLogger( AsyncTransactionWorker.class );

	private BlockingQueue< AysncOperation > aysncTransactionQueue = null;
	private DirectTransaction directTransaction;
	private boolean isStop = false;

	private Thread aysncTransactionThread = null;


	private AtomicLong aysncPutTimes;
	private AtomicLong aysncTakeTimes;


	public AsyncTransactionWorker( int asyncQueueSize ){
		aysncTransactionQueue = new ArrayBlockingQueue< AysncOperation >( asyncQueueSize, true ) ;	
		directTransaction = new DirectTransaction( );
		this.aysncPutTimes = new AtomicLong( 0 );
		this.aysncTakeTimes = new AtomicLong( 0 );
	}


	public void start( ){

		aysncTransactionThread = new Thread( this );
		aysncTransactionThread.start();

		logger.info("aysncTransactionThread线程启动");		
	}


	public void stop( ){
		isStop = true;
	}

	
	public void put( AysncOperation op ){

		try {
			aysncPutTimes.incrementAndGet();
			aysncTransactionQueue.put( op );

		} catch (InterruptedException e) {

			aysncPutTimes.decrementAndGet( );

			logger.error( " put AysncOperation error ", e );	
		}

	}



	@Override
	public void run() {

		if ( aysncTransactionQueue == null ){
			return;
		}

		while ( true ) {

			try {

				if( aysncTransactionQueue.isEmpty() ){
					if( isStop ){
						break;
					}
				}

				AysncOperation task = null;
				try{
					aysncTakeTimes.incrementAndGet();
					task  = aysncTransactionQueue.take( );
				}catch( InterruptedException e){
					aysncTakeTimes.decrementAndGet();
					logger.error( "runnableQueue take error", e  );
				}


				if ( task != null ) {

					try {

						directTransaction.begin( );

						task.execute( directTransaction );

						directTransaction.commit();

					} catch ( Throwable e) {

						directTransaction.rollback();
						logger.error( " AysncTransactionQueueThread directTransaction execute error", e  );

					} finally {
						directTransaction.close( );
					}


				}


			} catch ( Throwable e) {
				logger.error( "AysncTransactionQueueThread run error", e );
			}

		}

		logger.info( "AysncTransactionQueueThread exit " );

	}

}
